/*-
 * <<
 * DBus
 * ==
 * Copyright (C) 2016 - 2019 Bridata
 * ==
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 *      http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * >>
 */


/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.creditease.dbus.common.splitters;

import com.creditease.dbus.common.FullPullConstants;
import com.creditease.dbus.common.bean.DBConfiguration;
import com.creditease.dbus.common.format.DataDBInputSplit;
import com.creditease.dbus.common.format.InputSplit;
import com.creditease.dbus.enums.DbusDatasourceType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

/**
 * Fix bug by Dbus team 20161230
 * Implement DBSplitter over date/time values.
 * Make use of logic from IntegerSplitter, since date/time are just longs
 * in Java.
 */
public class DateSplitter extends IntegerSplitter {

    private Logger logger = LoggerFactory.getLogger(getClass());

    //Factor to convert the value to milliseconds.
    //For Split limit we take input as seconds. So we need to convert to milliseconds
    private static final long MS_IN_SEC = 1000L;

    public List<InputSplit> split(long numSplits,
                                  ResultSet results, String colName, DBConfiguration dbConf) throws SQLException {

        long minVal;
        long maxVal;

        int sqlDataType = results.getMetaData().getColumnType(1);
        logger.info("Oringinal sqlDataType is {}", sqlDataType);
        String datasourceType = dbConf.getString(DBConfiguration.DataSourceInfo.DS_TYPE);
        if (datasourceType.toUpperCase().equals(DbusDatasourceType.ORACLE.name()) && sqlDataType == Types.DATE) {
        /* 对于Oracle Types.DATE，将其强制转换成Types.TIMESTAMP。
        // Oracle的Date类型不仅保存年月日，还能保存时分秒甚至毫秒信息。
        // 但Oracle通过resultSet.getObject获取时间时，可能遭到截断，丢失时分秒（http://www.myexception.cn/database/1044846.html）
        // 例：对于 2008-06-13 13:48:21.0， 9i/11g返回2008-06-13 13:48:21.0；10g返回2008-06-13
        // 这里提到的版本指数据库服务器版本。同样版本的jdbc，连不同环境的Oracle服务器，同样的类型和数据，返回值不一样。
        // 提示说可用prop.setProperty("oracle.jdbc.V8Compatible" ,"true");解决。在有问题的环境验证，没解决问题。
        // 所以采取了强制转换为Types.TIMESTAMP的方式处理。*/
            sqlDataType = Types.TIMESTAMP;
        }
        logger.info("Transformed sqlDataType is {}", sqlDataType);

        minVal = resultSetColToLong(results, 1, sqlDataType);
        maxVal = resultSetColToLong(results, 2, sqlDataType);
        logger.info("minVal is {} maxVal is {}", minVal, maxVal);
        if (numSplits < 1L) {
            numSplits = 1L;
        }

        if (minVal == Long.MIN_VALUE && maxVal == Long.MIN_VALUE) {
            // The range of acceptable dates is NULL to NULL. Just create a single
            List<InputSplit> splits = new ArrayList<InputSplit>();
            splits.add(new DataDBInputSplit(sqlDataType, colName, FullPullConstants.QUERY_COND_IS_NULL, null, FullPullConstants.QUERY_COND_IS_NULL, null));
            return splits;
        }

        // For split size we are using seconds. So we need to convert to milliseconds.
        long splitLimit = dbConf.getSplitShardSize() * MS_IN_SEC;

        // Gather the split point integers
        List<Long> splitPoints = split(numSplits, splitLimit, minVal, maxVal);
        List<InputSplit> splits = new ArrayList<InputSplit>();

        // Turn the split points into a set of intervals.
        long start = splitPoints.get(0);
        Date startDate = longToDate(start, sqlDataType);
        if (sqlDataType == Types.TIMESTAMP) {
            // The lower bound's nanos value needs to match the actual lower-bound
            // nanos.
            try {
                ((java.sql.Timestamp) startDate).setNanos(
                        results.getTimestamp(1).getNanos());
            } catch (NullPointerException npe) {
                // If the lower bound was NULL, we'll get an NPE; just ignore it and
                // don't set nanos.
            }
        }

        for (int i = 1; i < splitPoints.size(); i++) {
            long end = splitPoints.get(i);
            Date endDate = longToDate(end, sqlDataType);

            if (i == splitPoints.size() - 1) {
                if (sqlDataType == Types.TIMESTAMP) {
                    // The upper bound's nanos value needs to match the actual
                    // upper-bound nanos.
                    try {
                        ((java.sql.Timestamp) endDate).setNanos(
                                results.getTimestamp(2).getNanos());
                    } catch (NullPointerException npe) {
                        // If the upper bound was NULL, we'll get an NPE; just ignore it
                        // and don't set nanos.
                    }
                }
                // This is the last one; use a closed interval.
                splits.add(new DataDBInputSplit(sqlDataType, colName, " >= ", startDate, " <= ", endDate));
            } else {
                // Normal open-interval case.
                splits.add(new DataDBInputSplit(sqlDataType, colName, " >= ", startDate, " < ", endDate));
            }

            start = end;
            startDate = endDate;
        }

        if (minVal == Long.MIN_VALUE || maxVal == Long.MIN_VALUE) {
            // Add an extra split to handle the null case that we saw.
            splits.add(new DataDBInputSplit(sqlDataType, colName, FullPullConstants.QUERY_COND_IS_NULL, null, FullPullConstants.QUERY_COND_IS_NULL, null));
        }

        return splits;
    }

    /**
     * Retrieve the value from the column in a type-appropriate manner and
     * return its timestamp since the epoch. If the column is null, then return
     * Long.MIN_VALUE.  This will cause a special split to be generated for the
     * NULL case, but may also cause poorly-balanced splits if most of the
     * actual dates are positive time since the epoch, etc.
     */
    private long resultSetColToLong(ResultSet rs, int colNum, int sqlDataType)
            throws SQLException {
        try {
            switch (sqlDataType) {
                case Types.DATE:
                    return rs.getDate(colNum).getTime();
                case Types.TIME:
                    return rs.getTime(colNum).getTime();
                case Types.TIMESTAMP:
                    return rs.getTimestamp(colNum).getTime();
                default:
                    throw new SQLException("Not a date-type field");
            }
        } catch (NullPointerException npe) {
            // null column. return minimum long value.
            logger.warn("Encountered a NULL date in the split column. "
                    + "Splits may be poorly balanced.");
            return Long.MIN_VALUE;
        }
    }

    /**
     * Parse the long-valued timestamp into the appropriate SQL date type.
     */
    public static Date longToDate(long val, int sqlDataType) {
        switch (sqlDataType) {
            case Types.DATE:
                return new java.sql.Date(val);
            case Types.TIME:
                return new java.sql.Time(val);
            case Types.TIMESTAMP:
                return new java.sql.Timestamp(val);
            default: // Shouldn't ever hit this case.
                return null;
        }
    }

    /**
     * Given a Date 'd', format it as a string for use in a SQL date
     * comparison operation.
     *
     * @param d the date to format.
     * @return the string representing this date in SQL with any appropriate
     * quotation characters, etc.
     */
    protected String dateToString(Date d) {
        return "'" + d.toString() + "'";
    }
}
